package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @ClassName 生产者
 * @Description TODO
 * @Author 寇铭
 * @Date 2022/8/5 22:05
 * @Version 1.0
 **/
public class ProducerQuickStart {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.kafka配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //发送失败，重试的次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //ack配置 消息确认机制
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //消息压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

        //2.生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        //封装发送的消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("itheima-topic", "100001", "hello kafka");
        //3.同步发送消息
//        RecordMetadata recordMetadata = producer.send(record).get();
//        System.out.println(recordMetadata.offset());
        //3.异步发送消息
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    System.out.println("记录异常信息到日志表中");
                }
                System.out.println(recordMetadata.offset());
            }
        });
        //4.关闭消息通道，必须关闭，否则消息发送不成功
        producer.close();
    }

}
